import os
import os.path as p
import pwd
import re
import subprocess
import shutil
import distutils.dir_util
import socket
import time
import errno
from dicttoxml import dicttoxml
import xml.dom.minidom
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException

import docker
from docker.errors import ContainerError

from .client import Client, CommandRequest


HELPERS_DIR = p.dirname(__file__)
DEFAULT_ENV_NAME = 'env_file'


def _create_env_file(path, variables, fname=DEFAULT_ENV_NAME):
    full_path = os.path.join(path, fname)
    with open(full_path, 'w') as f:
        for var, value in variables.items():
            f.write("=".join([var, value]) + "\n")
    return full_path

class ClickHouseCluster:
    """ClickHouse cluster with several instances and (possibly) ZooKeeper.

    Add instances with several calls to add_instance(), then start them with the start() call.

    Directories for instances are created in the directory of base_path. After cluster is started,
    these directories will contain logs, database files, docker-compose config, ClickHouse configs etc.
    """

    def __init__(self, base_path, name=None, base_configs_dir=None, server_bin_path=None, client_bin_path=None,
                 zookeeper_config_path=None):
        self.base_dir = p.dirname(base_path)
        self.name = name if name is not None else ''

        self.base_configs_dir = base_configs_dir or os.environ.get('CLICKHOUSE_TESTS_BASE_CONFIG_DIR', '/etc/clickhouse-server/')
        self.server_bin_path = server_bin_path or os.environ.get('CLICKHOUSE_TESTS_SERVER_BIN_PATH', '/usr/bin/clickhouse')
        self.client_bin_path = client_bin_path or os.environ.get('CLICKHOUSE_TESTS_CLIENT_BIN_PATH', '/usr/bin/clickhouse-client')
        self.zookeeper_config_path = p.join(self.base_dir, zookeeper_config_path) if zookeeper_config_path else p.join(HELPERS_DIR, 'zookeeper_config.xml')

        self.project_name = pwd.getpwuid(os.getuid()).pw_name + p.basename(self.base_dir) + self.name
        # docker-compose removes everything non-alphanumeric from project names so we do it too.
        self.project_name = re.sub(r'[^a-z0-9]', '', self.project_name.lower())
        self.instances_dir = p.join(self.base_dir, '_instances' + ('' if not self.name else '_' + self.name))

        self.base_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name]
        self.base_zookeeper_cmd = None
        self.base_mysql_cmd = []
        self.base_kafka_cmd = []
        self.pre_zookeeper_commands = []
        self.instances = {}
        self.with_zookeeper = False
        self.with_mysql = False
        self.with_kafka = False

        self.docker_client = None
        self.is_up = False


    def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, hostname=None, env_variables={}):
        """Add an instance to the cluster.

        name - the name of the instance directory and the value of the 'instance' macro in ClickHouse.
        config_dir - a directory with config files which content will be copied to /etc/clickhouse-server/ directory
        main_configs - a list of config files that will be added to config.d/ directory
        user_configs - a list of config files that will be added to users.d/ directory
        with_zookeeper - if True, add ZooKeeper configuration to configs and ZooKeeper instances to the cluster.
        """

        if self.is_up:
            raise Exception("Can\'t add instance %s: cluster is already up!" % name)

        if name in self.instances:
            raise Exception("Can\'t add instance `%s': there is already an instance with the same name!" % name)

        instance = ClickHouseInstance(
            self, self.base_dir, name, config_dir, main_configs, user_configs, macros, with_zookeeper,
            self.zookeeper_config_path, with_mysql, with_kafka, self.base_configs_dir, self.server_bin_path, clickhouse_path_dir, hostname=hostname, env_variables=env_variables)

        self.instances[name] = instance
        self.base_cmd.extend(['--file', instance.docker_compose_path])
        if with_zookeeper and not self.with_zookeeper:
            self.with_zookeeper = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')])
            self.base_zookeeper_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
                                       self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')]

        if with_mysql and not self.with_mysql:
            self.with_mysql = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')])
            self.base_mysql_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
                                       self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')]

        if with_kafka and not self.with_kafka:
            self.with_kafka = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_kafka.yml')])
            self.base_kafka_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
                                       self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_kafka.yml')]

        return instance


    def get_instance_docker_id(self, instance_name):
        # According to how docker-compose names containers.
        return self.project_name + '_' + instance_name + '_1'


    def get_instance_ip(self, instance_name):
        docker_id = self.get_instance_docker_id(instance_name)
        handle = self.docker_client.containers.get(docker_id)
        return handle.attrs['NetworkSettings']['Networks'].values()[0]['IPAddress']


    def start(self, destroy_dirs=True):
        if self.is_up:
            return

        # Just in case kill unstopped containers from previous launch
        try:
            if not subprocess.call(['docker-compose', 'kill']):
                subprocess.call(['docker-compose', 'down', '--volumes'])
        except:
            pass

        if destroy_dirs and p.exists(self.instances_dir):
            print "Removing instances dir", self.instances_dir
            shutil.rmtree(self.instances_dir)

        for instance in self.instances.values():
            instance.create_dir(destroy_dir=destroy_dirs)

        self.docker_client = docker.from_env()

        if self.with_zookeeper and self.base_zookeeper_cmd:
            subprocess.check_call(self.base_zookeeper_cmd + ['up', '-d', '--no-recreate'])
            for command in self.pre_zookeeper_commands:
                self.run_kazoo_commands_with_retries(command, repeats=5)

        if self.with_mysql and self.base_mysql_cmd:
            subprocess.check_call(self.base_mysql_cmd + ['up', '-d', '--no-recreate'])

        if self.with_kafka and self.base_kafka_cmd:
            subprocess.check_call(self.base_kafka_cmd + ['up', '-d', '--no-recreate'])
            self.kafka_docker_id = self.get_instance_docker_id('kafka1')

        # Uncomment for debugging
        #print ' '.join(self.base_cmd + ['up', '--no-recreate'])

        subprocess.check_call(self.base_cmd + ['up', '-d', '--no-recreate'])

        start_deadline = time.time() + 20.0 # seconds
        for instance in self.instances.itervalues():
            instance.docker_client = self.docker_client
            instance.ip_address = self.get_instance_ip(instance.name)

            instance.wait_for_start(start_deadline)

            instance.client = Client(instance.ip_address, command=self.client_bin_path)


        self.is_up = True


    def shutdown(self, kill=True):
        if kill:
            subprocess.check_call(self.base_cmd + ['kill'])
        subprocess.check_call(self.base_cmd + ['down', '--volumes', '--remove-orphans'])
        self.is_up = False

        self.docker_client = None

        for instance in self.instances.values():
            instance.docker_client = None
            instance.ip_address = None
            instance.client = None


    def get_kazoo_client(self, zoo_instance_name):
        zk = KazooClient(hosts=self.get_instance_ip(zoo_instance_name))
        zk.start()
        return zk


    def run_kazoo_commands_with_retries(self, kazoo_callback, zoo_instance_name = 'zoo1', repeats=1, sleep_for=1):
        for i in range(repeats - 1):
            try:
                kazoo_callback(self.get_kazoo_client(zoo_instance_name))
                return
            except KazooException as e:
                print repr(e)
                time.sleep(sleep_for)

        kazoo_callback(self.get_kazoo_client(zoo_instance_name))


    def add_zookeeper_startup_command(self, command):
        self.pre_zookeeper_commands.append(command)


DOCKER_COMPOSE_TEMPLATE = '''
version: '2'
services:
    {name}:
        image: ubuntu:14.04
        hostname: {hostname}
        user: '{uid}'
        volumes:
            - {binary_path}:/usr/bin/clickhouse:ro
            - {configs_dir}:/etc/clickhouse-server/
            - {db_dir}:/var/lib/clickhouse/
            - {logs_dir}:/var/log/clickhouse-server/
        entrypoint:
            -  /usr/bin/clickhouse
            -  server
            -  --config-file=/etc/clickhouse-server/config.xml
            -  --log-file=/var/log/clickhouse-server/clickhouse-server.log
            -  --errorlog-file=/var/log/clickhouse-server/clickhouse-server.err.log
        depends_on: {depends_on}
        env_file:
            - {env_file}
'''


class ClickHouseInstance:
    def __init__(
            self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macros,
            with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, base_configs_dir, server_bin_path, clickhouse_path_dir, hostname=None, env_variables={}):

        self.name = name
        self.base_cmd = cluster.base_cmd[:]
        self.docker_id = cluster.get_instance_docker_id(self.name)
        self.cluster = cluster
        self.hostname = hostname if hostname is not None else self.name

        self.custom_config_dir = p.abspath(p.join(base_path, custom_config_dir)) if custom_config_dir else None
        self.custom_main_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_main_configs]
        self.custom_user_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_user_configs]
        self.clickhouse_path_dir = p.abspath(p.join(base_path, clickhouse_path_dir)) if clickhouse_path_dir else None
        self.macros = macros if macros is not None else {}
        self.with_zookeeper = with_zookeeper
        self.zookeeper_config_path = zookeeper_config_path

        self.base_configs_dir = base_configs_dir
        self.server_bin_path = server_bin_path

        self.with_mysql = with_mysql
        self.with_kafka = with_kafka

        self.path = p.join(self.cluster.instances_dir, name)
        self.docker_compose_path = p.join(self.path, 'docker_compose.yml')
        self.env_variables = env_variables

        self.docker_client = None
        self.ip_address = None
        self.client = None
        self.default_timeout = 20.0 # 20 sec

    # Connects to the instance via clickhouse-client, sends a query (1st argument) and returns the answer
    def query(self, *args, **kwargs):
        return self.client.query(*args, **kwargs)

    # As query() but doesn't wait response and returns response handler
    def get_query_request(self, *args, **kwargs):
        return self.client.get_query_request(*args, **kwargs)


    def exec_in_container(self, cmd, **kwargs):
        container = self.get_docker_handle()
        exec_id = self.docker_client.api.exec_create(container.id, cmd, **kwargs)
        output = self.docker_client.api.exec_start(exec_id, detach=False)

        output = output.decode('utf8')
        exit_code = self.docker_client.api.exec_inspect(exec_id)['ExitCode']
        if exit_code:
            raise Exception('Cmd "{}" failed! Return code {}. Output: {}'.format(' '.join(cmd), exit_code, output))
        return output


    def get_docker_handle(self):
        return self.docker_client.containers.get(self.docker_id)


    def stop(self):
        self.get_docker_handle().stop(self.default_timeout)


    def start(self):
        self.get_docker_handle().start()


    def wait_for_start(self, deadline=None, timeout=None):
        start_time = time.time()

        if timeout is not None:
            deadline = start_time + timeout

        while True:
            handle = self.get_docker_handle()
            status = handle.status;
            if status == 'exited':
                raise Exception("Instance `{}' failed to start. Container status: {}, logs: {}".format(self.name, status, handle.logs()))

            current_time = time.time()
            time_left = deadline - current_time
            if deadline is not None and current_time >= deadline:
                raise Exception("Timed out while waiting for instance `{}' with ip address {} to start. "
                                "Container status: {}".format(self.name, self.ip_address, status))

            # Repeatedly poll the instance address until there is something that listens there.
            # Usually it means that ClickHouse is ready to accept queries.
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.settimeout(time_left)
                sock.connect((self.ip_address, 9000))
                return
            except socket.timeout:
                continue
            except socket.error as e:
                if e.errno == errno.ECONNREFUSED:
                    time.sleep(0.1)
                else:
                    raise
            finally:
                sock.close()


    @staticmethod
    def dict_to_xml(dictionary):
        xml_str = dicttoxml(dictionary, custom_root="yandex", attr_type=False)
        return xml.dom.minidom.parseString(xml_str).toprettyxml()


    def create_dir(self, destroy_dir=True):
        """Create the instance directory and all the needed files there."""

        if destroy_dir:
            self.destroy_dir()
        elif p.exists(self.path):
            return

        os.makedirs(self.path)

        configs_dir = p.abspath(p.join(self.path, 'configs'))
        os.mkdir(configs_dir)

        shutil.copy(p.join(self.base_configs_dir, 'config.xml'), configs_dir)
        shutil.copy(p.join(self.base_configs_dir, 'users.xml'), configs_dir)

        config_d_dir = p.abspath(p.join(configs_dir, 'config.d'))
        users_d_dir = p.abspath(p.join(configs_dir, 'users.d'))
        os.mkdir(config_d_dir)
        os.mkdir(users_d_dir)

        shutil.copy(p.join(HELPERS_DIR, 'common_instance_config.xml'), config_d_dir)

        # Generate and write macros file
        macros = self.macros.copy()
        macros['instance'] = self.name
        with open(p.join(config_d_dir, 'macros.xml'), 'w') as macros_config:
            macros_config.write(self.dict_to_xml({"macros" : macros}))

        # Put ZooKeeper config
        if self.with_zookeeper:
            shutil.copy(self.zookeeper_config_path, config_d_dir)

        # Copy config dir
        if self.custom_config_dir:
            distutils.dir_util.copy_tree(self.custom_config_dir, configs_dir)

        # Copy config.d configs
        for path in self.custom_main_config_paths:
            shutil.copy(path, config_d_dir)

        # Copy users.d configs
        for path in self.custom_user_config_paths:
            shutil.copy(path, users_d_dir)

        db_dir = p.abspath(p.join(self.path, 'database'))
        os.mkdir(db_dir)
        if self.clickhouse_path_dir is not None:
            distutils.dir_util.copy_tree(self.clickhouse_path_dir, db_dir)

        logs_dir = p.abspath(p.join(self.path, 'logs'))
        os.mkdir(logs_dir)

        depends_on = []

        if self.with_mysql:
            depends_on.append("mysql1")

        if self.with_kafka:
            depends_on.append("kafka1")

        if self.with_zookeeper:
            depends_on.append("zoo1")
            depends_on.append("zoo2")
            depends_on.append("zoo3")

        env_file = _create_env_file(os.path.dirname(self.docker_compose_path), self.env_variables)

        with open(self.docker_compose_path, 'w') as docker_compose:
            docker_compose.write(DOCKER_COMPOSE_TEMPLATE.format(
                name=self.name,
                hostname=self.hostname,
                uid=os.getuid(),
                binary_path=self.server_bin_path,
                configs_dir=configs_dir,
                config_d_dir=config_d_dir,
                db_dir=db_dir,
                logs_dir=logs_dir,
                depends_on=str(depends_on),
                env_file=env_file))


    def destroy_dir(self):
        if p.exists(self.path):
            shutil.rmtree(self.path)
